///
/// 提供spsc/mpmc/spmc/mpsc几种管道，所有的管道都支持利用epoll/select来监听读写事件
///
/// # Example
///
/// ```rust
/// use hirun::channel::spsc;
///
/// let (sender, receiver) = spsc::channel::<i32>(10).unwrap();
///
/// let handle = std::thread::spawn(move || {
///     for i in 0..1000 {
///         sender.send(i);
///     }
/// });
///
/// for i in 0..1000 {
///     let data = receiver.recv();
///     assert_eq!(i, data);
/// }
/// assert_eq!(None, receiver.try_recv());
/// handle.join().unwrap();
/// ```
/// # Example
///
/// ```rust
/// use core::mem::MaybeUninit;
/// use core::slice;
/// use hirun::channel::mpsc;
///
/// fn to_uninit<T>(datas: &[T]) -> &[MaybeUninit<T>] {
///     unsafe {
///         let ptr = &datas[0] as *const T as *const MaybeUninit<T>;
///         slice::from_raw_parts(ptr, datas.len())
///     }
/// }
///
/// let (sender1, receiver) = mpsc::channel::<i32>(4).unwrap();
/// let sender2 = sender1.clone();
///
/// let handle1 = std::thread::spawn(move || {
///     let datas = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
///     sender1.send_slice(to_uninit(&datas[..]));
/// });
///
/// let handle2 = std::thread::spawn(move || {
///     let datas = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
///     sender2.send_slice(to_uninit(&datas[..]));
/// });
///
/// for i in 0..21 {
///     let data = receiver.recv();
///     assert!(data < 21 && data >= 0);
/// }
/// assert_eq!(None, receiver.try_recv());
/// handle1.join().unwrap();
/// handle2.join().unwrap();
/// ```
///
/// # Example
///
/// ```rust
/// use hirun::channel::spmc;
/// use core::time::Duration;
///
/// let (sender, receiver1) = spmc::channel::<i32>(10).unwrap();
/// let receiver2 = receiver1.clone();
///
/// let handle1 = std::thread::spawn(move || {
///     let mut old = -1;
///     while let Some(val) = receiver1.recv_timeout(Duration::new(1, 0)) {
///         assert!(val >= 0 && val < 10000);
///         assert!(val > old);
///         old = val;
///     }
/// });
///
/// let handle2 = std::thread::spawn(move || {
///     let mut old = -1;
///     while let Some(val) = receiver2.recv_timeout(Duration::new(1, 0)) {
///         assert!(val >= 0 && val < 10000);
///         assert!(val > old);
///         old = val;
///     }
/// });
///
/// for i in 0..10000 {
///     sender.send(i);
/// }
///
/// handle1.join().unwrap();
/// handle2.join().unwrap();
/// ```
///
/// # Example
///
/// ```rust
/// use hirun::channel::mpmc;
/// use core::time::Duration;
///
/// let (sender1, receiver1) = mpmc::channel::<i32>(10).unwrap();
/// let receiver2 = receiver1.clone();
/// let sender2 = sender1.clone();
///
/// let handle1 = std::thread::spawn(move || {
///     while let Some(val) = receiver1.recv_timeout(Duration::new(1, 0)) {
///         assert!(val >= 0 && val < 10000);
///     }
/// });
///
/// let handle2 = std::thread::spawn(move || {
///     while let Some(val) = receiver2.recv_timeout(Duration::new(1, 0)) {
///         assert!(val >= 0 && val < 10000);
///     }
/// });
///
/// let handle3 = std::thread::spawn(move || {
///     for i in 0..5000 {
///         sender1.send(i);
///     }
/// });
///
/// let handle4 = std::thread::spawn(move || {
///     for i in 5000..10000 {
///         sender2.send(i);
///     }
/// });
///
/// handle1.join().unwrap();
/// handle2.join().unwrap();
/// handle3.join().unwrap();
/// handle4.join().unwrap();
/// ```
///
use core::mem::{self, MaybeUninit};
use core::ptr;
use core::sync::atomic::{AtomicU16, Ordering::Relaxed};
use core::time::Duration;
extern crate hipool as pool;

cfg_if::cfg_if! {
    if #[cfg(any(target_os = "linux", target_os = "android"))] {
        mod linux;
        type EventFd = linux::EventFd;
    } else {
    }
}

mod token;
pub use token::*;

mod status;
pub(crate) use status::*;

pub(crate) mod pipe;

pub mod mpmc;
pub mod mpsc;
pub mod spmc;
pub mod spsc;

pub trait Notify: Default {
    fn notify(&self, from: u8, to: u8);
    fn wait_read(&self, timeout: Option<Duration>) -> Option<Duration>;
    fn wait_write(&self, timeout: Option<Duration>) -> Option<Duration>;
    fn read_fd_event(&self) -> (i32, bool);
    fn write_fd_event(&self) -> (i32, bool);
}

pub struct NullNotify;

impl Default for NullNotify {
    fn default() -> Self {
        NullNotify
    }
}

impl Notify for NullNotify {
    fn notify(&self, _from: u8, _to: u8) {}
    fn wait_read(&self, _timeout: Option<Duration>) -> Option<Duration> {
        None
    }
    fn wait_write(&self, _timeout: Option<Duration>) -> Option<Duration> {
        None
    }
    fn read_fd_event(&self) -> (i32, bool) {
        (-1, true)
    }
    fn write_fd_event(&self) -> (i32, bool) {
        (-1, true)
    }
}

struct Queue<'a, T> {
    status: AtomicStatus,
    queue: &'a mut [MaybeUninit<T>],
}

impl<T> Drop for Queue<'_, T> {
    fn drop(&mut self) {
        if !mem::needs_drop::<T>() {
            return;
        }
        let (r, _, n) = self.status.load(Relaxed);
        let mut pos = self.idx(r as usize);
        for _ in 0..n as usize {
            unsafe {
                self.queue[pos].assume_init_drop();
            }
            pos = self.next(pos);
        }
    }
}

pub type SWriteIdx = ();
pub type SReadIdx = ();
pub type MWriteIdx = AtomicU16;
pub type MReadIdx = AtomicU16;

impl<'a, T> Queue<'a, T> {
    pub(crate) fn new(queue: &'a mut [MaybeUninit<T>]) -> Self {
        debug_assert!(check_len(queue.len()));
        Self {
            queue,
            status: AtomicStatus::new(),
        }
    }
}

impl<T> Queue<'_, T> {
    pub(crate) fn sp_push_one<N>(&self, _idx: &SWriteIdx, data: T, notify: &N) -> Result<(), T>
    where
        N: Notify,
    {
        let (w, c) = self.status.fetch_widx(1, self.len());
        if c > 0 {
            unsafe { self.write(w as usize, data) };
            self.status
                .update_widx(w, w.wrapping_add(1), self.len(), notify);
            return Ok(());
        }
        Err(data)
    }

    pub(crate) fn sp_push_slice<N>(
        &self,
        _idx: &SWriteIdx,
        datas: &[MaybeUninit<T>],
        notify: &N,
    ) -> Option<usize>
    where
        N: Notify,
    {
        let (w, c) = self.status.fetch_widx(datas.len() as u16, self.len());
        if c > 0 {
            unsafe {
                self.write_slice(w as usize, &datas[..c as usize]);
            }
            self.status
                .update_widx(w, w.wrapping_add(c), self.len(), notify);
            return Some(c as usize);
        }
        None
    }

    pub(crate) fn sc_pop_one<N>(&self, idx: &SReadIdx, notify: &N) -> Option<T>
    where
        N: Notify,
    {
        let mut ret = None;
        self.sc_pop(idx, 1, notify, |_, ridx| {
            ret = unsafe { Some(self.read(ridx)) };
        });
        ret
    }

    pub(crate) fn sc_pop_slice<N>(
        &self,
        idx: &SReadIdx,
        datas: &mut [MaybeUninit<T>],
        notify: &N,
    ) -> Option<usize>
    where
        N: Notify,
    {
        self.sc_pop(idx, datas.len(), notify, |cnt, pos| unsafe {
            self.read_slice(pos, &mut datas[0..cnt]);
        })
    }

    pub(crate) fn mp_push_one<N>(
        &self,
        idx: &MWriteIdx,
        data: T,
        notify: &N,
        token: &Token,
    ) -> Result<(), T>
    where
        N: Notify,
    {
        let (w, c) = self.status.fetch_widx_with(idx, 1, self.len());
        if c > 0 {
            unsafe { self.write(w as usize, data) };
            self.status
                .update_widx_with(w, w.wrapping_add(1), self.len(), notify, token);
            return Ok(());
        }
        Err(data)
    }

    pub(crate) fn mp_push_slice<N>(
        &self,
        idx: &MWriteIdx,
        datas: &[MaybeUninit<T>],
        notify: &N,
        token: &Token,
    ) -> Option<usize>
    where
        N: Notify,
    {
        let (w, c) = self
            .status
            .fetch_widx_with(idx, datas.len() as u16, self.len());
        if c > 0 {
            unsafe {
                self.write_slice(w as usize, &datas[..c as usize]);
            }
            self.status
                .update_widx_with(w, w.wrapping_add(c), self.len(), notify, token);
            return Some(c as usize);
        }
        None
    }

    pub(crate) fn mc_pop_one<N>(&self, idx: &MReadIdx, notify: &N, token: &Token) -> Option<T>
    where
        N: Notify,
    {
        let mut ret = None;
        self.mc_pop(idx, 1, notify, token, |_, pos| {
            ret = Some(unsafe { self.read(pos) });
        });
        ret
    }

    pub(crate) fn mc_pop_slice<N>(
        &self,
        idx: &MReadIdx,
        datas: &mut [MaybeUninit<T>],
        notify: &N,
        token: &Token,
    ) -> Option<usize>
    where
        N: Notify,
    {
        self.mc_pop(idx, datas.len(), notify, token, |cnt, pos| unsafe {
            self.read_slice(pos, &mut datas[..cnt]);
        })
    }

    unsafe fn write(&self, idx: usize, data: T) {
        ptr::write(
            self.queue.get_unchecked(self.idx(idx)).as_ptr() as *mut T,
            data,
        );
    }

    unsafe fn read(&self, idx: usize) -> T {
        self.queue.get_unchecked(self.idx(idx)).assume_init_read()
    }

    unsafe fn read_slice(&self, pos: usize, datas: &mut [MaybeUninit<T>]) {
        let pos = self.idx(pos);
        let end = self.idx(pos.wrapping_add(datas.len()));
        if end > pos {
            ptr::copy_nonoverlapping(
                self.queue.get_unchecked(pos).as_ptr(),
                datas.get_unchecked(0).as_ptr().cast_mut(),
                datas.len(),
            );
        } else {
            let len = self.queue.len() - pos;
            ptr::copy_nonoverlapping(
                self.queue.get_unchecked(pos).as_ptr(),
                datas.get_unchecked(0).as_ptr().cast_mut(),
                len,
            );
            // #7 确保data.get_unchecked(len)不会越界.
            if len < datas.len() {
                ptr::copy_nonoverlapping(
                    self.queue.get_unchecked(0).as_ptr(),
                    datas.get_unchecked(len).as_ptr().cast_mut(),
                    datas.len() - len,
                );
            }
        }
    }

    #[allow(dead_code)]
    unsafe fn write_slice(&self, pos: usize, datas: &[MaybeUninit<T>]) {
        let pos = self.idx(pos);
        let end = self.idx(pos.wrapping_add(datas.len()));
        if end > pos {
            ptr::copy_nonoverlapping(
                datas.get_unchecked(0).as_ptr(),
                self.queue.get_unchecked(pos).as_ptr().cast_mut(),
                datas.len(),
            );
        } else {
            let len = self.queue.len() - pos;
            ptr::copy_nonoverlapping(
                datas.get_unchecked(0).as_ptr(),
                self.queue.get_unchecked(pos).as_ptr().cast_mut(),
                len,
            );
            // #7 确保data.get_unchecked(len)不会越界.
            if len < datas.len() {
                ptr::copy_nonoverlapping(
                    datas.get_unchecked(len).as_ptr(),
                    self.queue.get_unchecked(0).as_ptr().cast_mut(),
                    datas.len() - len,
                );
            }
        }
    }

    fn idx(&self, idx: usize) -> usize {
        idx & (self.queue.len() - 1)
    }

    fn next(&self, idx: usize) -> usize {
        self.idx(idx + 1)
    }

    fn sc_pop<N, F>(&self, _idx: &SReadIdx, cnt: usize, notify: &N, f: F) -> Option<usize>
    where
        N: Notify,
        F: FnOnce(usize, usize),
    {
        let (r, c) = self.status.fetch_ridx(cnt as u16);
        if c > 0 {
            f(c as usize, r as usize);
            self.status
                .update_ridx(r, r.wrapping_add(c), self.len(), notify);
            return Some(c as usize);
        }
        None
    }

    fn mc_pop<N, F>(
        &self,
        idx: &MReadIdx,
        cnt: usize,
        notify: &N,
        token: &Token,
        f: F,
    ) -> Option<usize>
    where
        N: Notify,
        F: FnOnce(usize, usize),
    {
        let (r, c) = self.status.fetch_ridx_with(idx, cnt as u16);
        if c > 0 {
            f(c as usize, r as usize);
            self.status
                .update_ridx_with(r, r.wrapping_add(c), self.len(), notify, token);
            return Some(c as usize);
        }
        None
    }

    fn len(&self) -> u16 {
        self.queue.len() as u16
    }
}

const fn check_len(len: usize) -> bool {
    (len & (len - 1)) == 0
}
